TwitterStream bugfixes

* do not run disabled `TwitterStreamAgents`
* calling `sleep` within the EM loop blocks the EM thread and thereby
  blocks the reload callback and proper shutdown of the threaded worker
* stopping the EM loop from outside of the `EM.run` blocks does not seem
  to work reliably, using a timer and checking if shutdown was requested
  works as expected
* removed unnecessary sleep at the end of the EM loop which would delay
  the shutdown of the thread

Dominik Sander 10 years ago
parent
commit
d529872e37
2 changed files with 10 additions and 8 deletions
  1. 2 0
      app/models/agent.rb
  2. 8 8
      lib/twitter_stream.rb

+ 2 - 0
app/models/agent.rb

@@ -55,6 +55,8 @@ class Agent < ActiveRecord::Base
55 55
   has_many :scenario_memberships, :dependent => :destroy, :inverse_of => :agent
56 56
   has_many :scenarios, :through => :scenario_memberships, :inverse_of => :agents
57 57
 
58
+  scope :active, -> { where(disabled: false) }
59
+
58 60
   scope :of_type, lambda { |type|
59 61
     type = case type
60 62
              when String, Symbol, Class

+ 8 - 8
lib/twitter_stream.rb

@@ -11,7 +11,6 @@ class TwitterStream
11 11
 
12 12
   def stop
13 13
     @running = false
14
-    EventMachine::stop_event_loop if EventMachine.reactor_running?
15 14
   end
16 15
 
17 16
   def stream!(filters, agent, &block)
@@ -91,9 +90,13 @@ class TwitterStream
91 90
   def run
92 91
     while @running
93 92
       begin
94
-        agents = Agents::TwitterStreamAgent.all
93
+        agents = Agents::TwitterStreamAgent.active.all
95 94
 
96 95
         EventMachine::run do
96
+          EventMachine.add_periodic_timer(1) {
97
+            EventMachine::stop_event_loop if !@running
98
+          }
99
+
97 100
           EventMachine.add_periodic_timer(RELOAD_TIMEOUT) {
98 101
             puts "Reloading EventMachine and all Agents..."
99 102
             EventMachine::stop_event_loop
@@ -101,17 +104,14 @@ class TwitterStream
101 104
 
102 105
           if agents.length == 0
103 106
             puts "No agents found.  Will look again in a minute."
104
-            sleep 60
105
-            EventMachine::stop_event_loop
107
+            EventMachine.add_timer(60) {
108
+              EventMachine::stop_event_loop
109
+            }
106 110
           else
107 111
             puts "Found #{agents.length} agent(s).  Loading them now..."
108 112
             load_and_run agents
109 113
           end
110 114
         end
111
-
112
-        print "Pausing..."; STDOUT.flush
113
-        sleep 1
114
-        puts "done."
115 115
       rescue SignalException, SystemExit
116 116
         @running = false
117 117
         EventMachine::stop_event_loop if EventMachine.reactor_running?